iT邦幫忙

2024 iThome 鐵人賽

DAY 17
0
DevOps

我獨自升級:從水管工走向 DataOps系列 第 17

【Day 17】用 Astronomer Cosmos 結合 dbt 和 Airflow - 再戰 Jaffle Shop

  • 分享至 

  • xImage
  •  

前言

今天要用最簡單的架構完成 Jaffle Shop 的 PoC(Proof of Concept,概念驗證),主要目的是讓大家能在自己的本機成功運行 Jaffle Shop 的 Cosmos 專案,進而熟悉 dbt 和 Airflow 的整合方式~/images/emoticon/emoticon42.gif

題外話:PoC、MVP、Prototype 的差異是什麼?

  1. PoC:主要是測試想法的可行性,可能只需要數天
  2. prototype:主要是展示UI/UX 或特定功能,可能需要數週
  3. MVP,minimum viable product 最小可行性產品,可能需要幾個月,基本上已經是半成品,主要是要測試市場的接受度
    流程:PoC => Prototype => MVP

本日專案程式碼

先決條件

  1. Git
  2. Python 3.9 以上
  3. Docker Desktop 或 Docker Compose

環境安裝

1. Clone 專案到本地

git clone  https://github.com/snhou/astronomer-cosmos-poc.git

2. 進入專案資料夾

cd astronomer-cosmos-poc

3. 開啟虛擬環境(非必要)

還是會希望開啟虛擬環境是因為在開發專案過程中,會用到一些 vscode 的插件或是測試,而這部分不一定都會進到正在運行中的 docker 去執行,其中一個範例就是明天會提到的 dbt 必備 vscode 插件 dbt Power User,基本上插件很難運用到 docker 環境,然後避免本機有多個環境,還是用虛擬環境最方便。

如果有人知道 dbt Power User 可以用 docker 環境的,拜託跟我說/images/emoticon/emoticon46.gif

  1. 開啟 venv 環境
python3 -m venv venv
  1. 進入 venv 環境
source venv/bin/activate
  1. 在虛擬環境安裝模組
python3 -m pip install -r requirements.txt

4. 運行 Docker 環境

執行之前記得打開 Docker Desktop!!

  1. 依照 Dockerfile 建立所需的 docker Image
docker compose build

如果過程中有修改 Dockerfile,建議加上 --no-cache 參數來重新建立

  1. 創建 docker 容器
docker compose up -d

-d代表要運行在背景

  1. docker ps 確認是否正常運行,都有看到 (healthy) 代表都沒問題!

正式執行

1. 進入 Docker 容器測試 dbt 運行

理論上,我們可以直接進到下一步開始執行 airflow 的 DAG,但既然前幾天有談到 dbt 的運行流程,進到 Docker 容器當中,除了可以複習一下,也能確定環境都沒問題/images/emoticon/emoticon34.gif

  1. dbt debug 確認連線資訊
dbt debug

https://ithelp.ithome.com.tw/upload/images/20241001/20135427G2Un6BRAs3.png

這部分為了讓 git 的部分不會有 error,可以在 Dockerfile 中看到有特別切換成 ROOT 來安裝 git

  1. dbt seed 載入靜態檔案
dbt seed

https://ithelp.ithome.com.tw/upload/images/20241001/20135427k5MilTQ2bT.png

  1. dbt run 執行 models/ 當中的 sql 轉換
dbt run

https://ithelp.ithome.com.tw/upload/images/20241001/20135427pJcELI66nu.png
4. dbt test 執行 models/ 當中 schema.yml 的資料測試

dbt test

https://ithelp.ithome.com.tw/upload/images/20241001/20135427WDl6ziKI5v.png

2. Airflow DAG 執行

開啟 localhost:8080 直接執行 simple_task_group DAG
https://ithelp.ithome.com.tw/upload/images/20241001/20135427aJQSwh87wa.png

程式碼說明

from datetime import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ProjectConfig
from include.profiles import airflow_db
from include.constants import jaffle_shop_path, venv_execution_config

@dag(
    schedule_interval=None,
    start_date=datetime(2023, 1, 1),
    catchup=False
)
def simple_task_group():
    pre_dbt = EmptyOperator(task_id="pre_dbt")
    jaffle_shop = DbtTaskGroup(
        group_id="my_jaffle_shop_project",
        project_config=ProjectConfig(jaffle_shop_path),
        profile_config=airflow_db,
        execution_config=venv_execution_config
    )
    post_dbt = EmptyOperator(task_id="post_dbt")
    
    pre_dbt >> jaffle_shop >> post_dbt

simple_task_group()
  • 創建了一個 jaffle_shop 的 TaskGroup
    • group_id: 這個 TaskGroup 的 ID。
    • project_config : 指定 dbt 專案的路徑。
    • profile_config : 使用 airflow_db 的設定來連接資料庫。
    • execution_config : 指定執行 dbt 時的路徑。
  • DbtTaskGroup 的前後有加上 EmptyOperator 的範例,可以依照 Data Pipeline 的需求串接需要的資料
  • 需要注意的部分:因為 include 放在 plugins 當中,而 plugins 在 docker 環境建置時就會載入,所以就可以直接用 from include.profiles import ...from include.constants import ... 載入所需的變數和函式,如果檔案不是在 plugins 當中的話,會需要修改 docker 環境設定

Jaffle Shop 專案結構

{Jaffle Shop}
├── dags
│   └── simple_task_group.py
├── dbt
│   └── jaffle_shop
│       ├── dbt_project.yml
│       ├── models
│       │   ├── marts
│       │   │   ├── orders.sql
│       │   │   └── schema.yml
│       │   └── staging
│       │       ├── schema.yml
│       │       ├── stg_orders.sql
│       │       └── stg_payments.sql
│       ├── profiles.yml
│       └── seeds
│           ├── raw_orders.csv
│           └── raw_payments.csv
├── Dockerfile
├── docker-compose.yaml
├── plugins
│   └── include
│       ├── constants.py
│       └── profiles.py
└── requirements.txt

從整個專案結構可以發現在 dbt/ 資料夾中,可以建立多個 dbt 專案,再透過 dag 執行不同專案目錄,就能夠達成在同一專案中進行不同資料庫的 dbt 資料轉換,如果只是單純想要讓 dag 執行特定的 model,也可以透過 tag 來分類執行

在 dbt 專案當中的 profiles.yml 其實以 airflow 來說是不必要的,因為已經從 profiles.py 設定好了,但是如果要像上方進入到 docker 容器中做測試,profiles.yml 還是不可少的,如果不想讓連接設定在多個不同地方,也可以透過環境變數統一控制

profiles.py 當中使用的 conn_id="airflow_metadata_db" ,正常是會到 airflow Web UI 上設定,但這次用的方式是直接寫在 docker-compose.yml 來設定,這樣如果有多個 db 設定的話,可以確保 docker 環境開啟後就能順利連接,設定方式如下,只需要在 conn_id 名稱前面加上 AIRFLOW_CONN_就可以了:

x-airflow-common:
  &airflow-common
  ...
  #image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.2}
  build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    ...
    AIRFLOW_CONN_AIRFLOW_METADATA_DB: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow

結語

明天就會介紹 dbt Power User,這個用 dbt 的朋友都愛不釋手的工具~~/images/emoticon/emoticon08.gif


上一篇
【Day 16】用 Astronomer Cosmos 結合 dbt 和 Airflow - 專案結構與環境
下一篇
【Day 18】dbt 專案必備插件 - dbt Power User
系列文
我獨自升級:從水管工走向 DataOps21
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言